1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.codec.http.server.httpserver; 12 import kiss.logger; 13 import collie.codec.http.session.httpsession; 14 import collie.codec.http.httptansaction; 15 import collie.codec.http.server.httpserveroptions; 16 import collie.codec.http.httpmessage; 17 import collie.codec.http.server.requesthandler; 18 import collie.codec.http.codec.httpcodec; 19 import collie.codec.http.httptansaction; 20 import collie.bootstrap.server; 21 import kiss.container.Vector; 22 import collie.channel; 23 import kiss.net.TcpStream; 24 import kiss.event; 25 import kiss.net.TcpListener; 26 import kiss.event.socket; 27 28 import collie.net.server.tcpserver; 29 import collie.net.server.connection; 30 import collie.bootstrap.exception; 31 import collie.bootstrap.exception; 32 import collie.bootstrap.serversslconfig; 33 34 import std.socket; 35 import std.parallelism; 36 37 alias HTTPPipeline = Pipeline!(const(ubyte[]), StreamWriteBuffer); 38 alias HTTPServer = HTTPServerImpl!true; 39 alias HttpServer = HTTPServerImpl!false; 40 41 final class HTTPServerImpl(bool UsePipeline) : HTTPSessionController 42 { 43 static if(UsePipeline){ 44 alias Server = ServerBootstrap!HTTPPipeline; 45 } else { 46 alias Server = TCPServer; 47 } 48 alias SVector = Vector!(Server); 49 alias IPVector = Vector!(HTTPServerOptions.IPConfig); 50 51 this(HTTPServerOptions options) 52 { 53 version(USE_SSL){ 54 if(options.ssLConfig){ 55 _ssl_Ctx = options.ssLConfig.generateSSLCtx(); 56 if(_ssl_Ctx is null) 57 throw new SSLException("can not generate SSL_Ctx!"); 58 } 59 } 60 _options = options; 61 _mainLoop = new EventLoop(); 62 size_t thread = _options.threads - 1; 63 if(thread > 0) { 64 if(thread>totalCPUs) thread = totalCPUs-1; 65 _group = new EventLoopGroup(cast(uint)thread); 66 } 67 68 } 69 70 void bind(ref IPVector addrs) 71 { 72 if(_isStart) return; 73 _ipconfigs = addrs; 74 for(size_t i = 0; i < _servers.length; ++i) 75 { 76 logDebug("start listen!!!"); 77 static if(UsePipeline) 78 _servers[i].stopListening(); 79 else 80 _servers[i].close(); 81 } 82 _servers.clear(); 83 for(size_t i = 0; i < _ipconfigs.length; ++i) 84 { 85 newServer(_ipconfigs[i]); 86 } 87 } 88 89 void addBind(ref HTTPServerOptions.IPConfig addr) 90 { 91 // logDebug("",_isStart); 92 if(_isStart) return; 93 newServer(addr); 94 _ipconfigs.insertBack(addr); 95 } 96 97 void start() 98 { 99 // logDebug("start ",_isStart); 100 if(_isStart) return; 101 _isStart = true; 102 for(size_t i = 0; i < _servers.length; ++i) 103 { 104 static if(UsePipeline) 105 _servers[i].startListening(); 106 else { 107 Server ser = _servers[i]; 108 ser.startTimeout(cast(uint)_options.timeOut); 109 ser.listen(1024); 110 } 111 } 112 if(_group) 113 _group.start(); 114 _mainLoop.run(); 115 } 116 117 void stop() 118 { 119 if(!_isStart) return; 120 if(_group) 121 _group.stop(); 122 _mainLoop.stop(); 123 } 124 125 ref const(IPVector) addresses() const{ return _ipconfigs;} 126 EventLoop eventLoop(){return _mainLoop;} 127 EventLoopGroup group(){return _group;} 128 ref const(SVector) servers(){return _servers;} 129 protected: 130 override HTTPTransactionHandler getRequestHandler(HTTPTransaction txn, HTTPMessage msg) 131 {/* will run in Multi-thread */ 132 RequestHandler req = null; 133 for(size_t i = 0; i < _options.handlerFactories.length; ++i) 134 { 135 req = _options.handlerFactories[i](req,msg); 136 } 137 if(req is null) 138 return null; 139 RequestHandlerAdaptor ada = new RequestHandlerAdaptor(req); 140 ada.setTransaction(txn); 141 return ada; 142 } 143 144 override void attachSession(HTTPSession session){/* will run in Multi-thread */} 145 146 override void detachSession(HTTPSession session){/* will run in Multi-thread */} 147 148 override void onSessionCodecChange(HTTPSession session){/* will run in Multi-thread */} 149 150 uint maxHeaderSize() const shared {return cast(uint)_options.maxHeaderSize;} 151 152 static if(UsePipeline){ 153 static void setAcceptorConfig(ref shared(HTTPServerOptions.IPConfig) config,TcpListener acceptor) 154 { 155 version(linux) { 156 if(config.enableTCPFastOpen){ 157 acceptor.setOption(SocketOptionLevel.TCP,cast(SocketOption)23,config.fastOpenQueueSize); 158 } 159 } 160 } 161 } 162 163 void newServer(HTTPServerOptions.IPConfig ipconfig ) 164 { 165 static if(UsePipeline){ 166 Server ser = new Server(_mainLoop); 167 if(_group) 168 ser.setReusePort(true); 169 ser.group(_group).childPipeline(new shared ServerHandlerFactory(this)); 170 version(USE_SSL){ 171 if(_options.ssLConfig) 172 ser.setSSLConfig(_options.ssLConfig); 173 } 174 ser.pipeline(new shared ServerAccpeTFactory(ipconfig)); 175 ser.heartbeatTimeOut(cast(uint)_options.timeOut); 176 ser.bind(ipconfig.address); 177 logDebug("binding on: ", ipconfig.address.toString()); 178 _servers.insertBack(ser); 179 } else { 180 bool ruseport = _group !is null; 181 _servers.insertBack(newTCPServer(_mainLoop,ipconfig.address,ruseport,ipconfig.enableTCPFastOpen,ipconfig.fastOpenQueueSize)); 182 if(ruseport){ 183 foreach(EventLoop loop; _group){ 184 _servers.insertBack(newTCPServer(loop,ipconfig.address,ruseport,ipconfig.enableTCPFastOpen,ipconfig.fastOpenQueueSize)); 185 } 186 } 187 188 } 189 } 190 static if(!UsePipeline){ 191 Server newTCPServer(EventLoop loop,Address address,bool ruseport, bool enableTCPFastOpen, uint fastOpenQueueSize ) 192 { 193 Server ser = new Server(loop); 194 ser.setNewConntionCallBack(&newConnect); 195 logDebug("binding on: ", address.toString()); 196 197 ser.bind(address,(TcpListener accpet) @trusted { 198 if(ruseport) 199 accpet.reusePort(true); 200 else { 201 version(windows){ 202 import core.sys.windows.winsock2; 203 accpet.setOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_EXCLUSIVEADDRUSE,true); 204 } else { 205 accpet.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); 206 } 207 } 208 version(linux) { 209 if(enableTCPFastOpen){ 210 accpet.setOption(SocketOptionLevel.TCP,cast(SocketOption)23,fastOpenQueueSize); 211 } 212 } 213 }); 214 return ser; 215 } 216 } 217 218 219 ServerConnection newConnect(TcpListener sender, TcpStream stream) 220 { 221 return new HttpHandlerConnection(stream,this, 222 new HTTP1XCodec(TransportDirection.DOWNSTREAM,cast(uint)_options.maxHeaderSize)); 223 } 224 private: 225 SVector _servers; 226 EventLoop _mainLoop; 227 EventLoopGroup _group = null; 228 229 230 HTTPServerOptions _options; 231 IPVector _ipconfigs; 232 SSL_CTX * _ssl_Ctx = null; 233 234 bool _isStart = false; 235 } 236 237 238 private: 239 240 import collie.codec.http.codec.http1xcodec; 241 import collie.codec.http.session.httpdownstreamsession; 242 import collie.codec.http.session.sessiondown; 243 244 class HttpHandlerConnection : HTTPConnection 245 { 246 this(TcpStream sock,HTTPSessionController controller,HTTPCodec codec) 247 { 248 super(sock); 249 httpSession = new HTTPDownstreamSession(controller,codec,this); 250 } 251 } 252 253 class HttpHandlerPipeline : PipelineSessionDown 254 { 255 this(HTTPSessionController controller,HTTPCodec codec) 256 { 257 httpSession(new HTTPDownstreamSession(controller,codec,this)); 258 } 259 } 260 261 class ServerHandlerFactory : PipelineFactory!HTTPPipeline 262 { 263 this(HTTPServer server) 264 { 265 _server = cast(typeof(_server))server; 266 } 267 override HTTPPipeline newPipeline(TcpStream transport) { 268 auto pipe = HTTPPipeline.create(); 269 pipe.addBack(new TCPSocketHandler(transport)); 270 pipe.addBack(new HttpHandlerPipeline(cast(HTTPServer)_server, 271 new HTTP1XCodec(TransportDirection.DOWNSTREAM,_server.maxHeaderSize))); 272 pipe.finalize(); 273 return pipe; 274 } 275 276 private: 277 HTTPServer _server; 278 } 279 280 class ServerAccpeTFactory : AcceptPipelineFactory 281 { 282 this(HTTPServerOptions.IPConfig config) 283 { 284 _conf = cast(typeof(_conf))config; 285 } 286 287 override AcceptPipeline newPipeline(TcpListener acceptor) { 288 logDebug("--new accpetPipeLine"); 289 AcceptPipeline pipe = AcceptPipeline.create(); 290 HTTPServer.setAcceptorConfig(_conf,acceptor); 291 return pipe; 292 } 293 294 private: 295 HTTPServerOptions.IPConfig _conf; 296 }